Seata 的简介 Seata 是阿里巴巴开源的一款高性能、易使用的分布式事务解决方案。它的核心价值在于,在微服务架构下,以极低的性能损耗和业务无感(侵入性极低)的方式,解决了跨数据库、跨服务的本地事务一致性问题,也就是实现了分布式系统下的 ACID 保证。另外,关于事务和分布式事务的理论知识可以参考之前的文章:《Spring 的本地事务》 、《分布式事务》 。Seata 官网 文档,请参考 Seata 快速开始 。
三驾马车 在 Seata 的分布式事务世界里,TC(事务协调器) 、TM(事务管理器) 和 RM(资源管理器) 是并肩作战的“三驾马车”。
TC (Transaction Coordinator) - 事务协调器。 维护全局事务的状态,负责协调并驱动全局事务的提交(Commit)或回滚(Rollback)。在生产中,它是一个独立的 Seata-Server 集群。
TM (Transaction Manager) - 事务管理器。 定义全局事务的边界。通常我们在业务的 Controller 或者 Service 方法上加一个 @GlobalTransactional 注解,这个微服务就化身成了 TM。它负责向 TC 申请开启、提交或回滚全局事务。
RM (Resource Manager) - 资源管理器。 管辖分支事务的资源(也就是各个微服务本地的数据库连接)。它向 TC 注册分支事务,并汇报分支事务的执行状态,同时驱动本地事务的提交或回滚。
要彻底看懂它们,不能孤立地背诵定义,而是要将它们放入一个完整的分布式事务生命周期(从诞生到消亡)中。在聊它们的交互生命周期之前,我们先明确这三个核心组件在物理上的存活状态:
当一个用户点击 “立即下单”时,一条贯穿 TM、RM、TC 的分布式事务生命周期线被正式拉开。我们用 5 个核心节点来还原它的演变现场:
我们可以用一句话把 TC、TM、RM 的生命周期交织逻辑串联起来 :TM 是始作俑者,它决定了全局事务的诞生与走向(提交或回滚); RM 是前线苦力,它在一阶段随用随锁、随完随释放,并在二阶段听从召唤进行数据清算; TC 是不灭的中央最高法院,它在后台俯瞰全局,用自身的数据持久化和长连接重试机制,拉长战线死磕 RM,直到逼迫所有 RM 达成最终一致性后,才亲手为该全局事务 “盖章销户”。这也就是为什么当你二阶段参数缺失返回 false 时,TC 的生命周期卡在 “清算阶段” 无法消亡,从而在后台疯狂重试你 RM 的根本原因!
四大模式 Seata 提供了四种武器,横向对比如下:
AT 模式是怎么回滚的?(undo_log 的秘密)
在第一阶段,RM 拦截到业务 SQL(例如 update stock set count = 90 where id = 1)。
Seata 会在本地事务提交前,自动查询修改前的数据镜像(Before Image:count=100)和修改后的数据镜像(After Image:count=90),并将这两份镜像做成反向补偿 SQL,塞进本地数据库的 undo_log 表中。
两个阶段如果成功,异步删除 undo_log;如果全局失败,RM 读取 undo_log 的 Before Image,将数据人肉还原回去。
AT模式又是怎么防止脏写的?(全局锁 Global Lock) 。如果本地事务在一阶段提交了,此时另外一个没有加 Seata 的普通线程跑过来把这行数据改了,Seata 怎么防止回滚时发生覆盖脏写?
实际上,Seata 还引入了 TC 侧的 “全局锁(Global Lock)” 。在一阶段本地事务提交前,RM 必须先拿到 TC 的全局锁。如果拿不到,说明有其他事务在操作,本地事务会不断重试。回滚时,也会校验 After Image 是否与当前数据库一致,如果不一致说明发生了脏写,会立刻触发报警,从而完美解决了读写隔离问题。
为什么在项目里选 Seata AT 模式?
核心是评估开发成本与性能之间的天平。AT 模式虽然在高并发下因为全局锁存在一定的性能妥协,但它提供了对业务代码零侵入的巨大优势,能让团队快速交付;而对于个别高并发的热点扣减链路,后续可以局部重构为 TCC 模式,实施精细化的性能压榨。
AT模式的测试案例 先对这个测试项目做一下简单介绍。这个简单的「分布式下单」案例基于 Spring Boot 3.x、MyBatis-Plus、OpenFeign、Nacos 注册中心以及 Seata 2.x(AT 模式) 构建。整个链路的物理拓扑架构如下:
链路 zdemo-seata-order (下单/开启全局事务) $\rightarrow$ 远程调用 $\rightarrow$ zdemo-seata-store (扣减库存) & zdemo-seata-user (扣减余额)。
另外 zdemo-seata-api 是公共模块,用来存放接口和公共类。
项目的父项目请参考 《Spring Cloud Alibaba 基础案例》
api 公共模块 依赖配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 <?xml version="1.0" encoding="UTF-8" ?> <project xmlns ="http://maven.apache.org/POM/4.0.0" xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation ="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" > <modelVersion > 4.0.0</modelVersion > <parent > <groupId > com.zdemo.scloud</groupId > <artifactId > zdemo-scloud-parent</artifactId > <version > 1.0-SNAPSHOT</version > </parent > <artifactId > zdemo-seata-api</artifactId > <dependencies > <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-starter-openfeign</artifactId > </dependency > <dependency > <groupId > org.projectlombok</groupId > <artifactId > lombok</artifactId > </dependency > </dependencies > </project >
公共接口(OpenFeign 实现):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 package com.zdemo.scloud.api.service;@FeignClient(name = "zdemo-seata-store", contextId = "storeClient") public interface StoreFeignClient { @PostMapping("/storage/decrease") Result<Void> decrease (@RequestParam("commodityCode") String commodityCode, @RequestParam("count") Integer count) ; } @FeignClient(name = "zdemo-seata-user", contextId = "userClient") public interface UserFeignClient { @PostMapping("/account/decrease") Result<Void> decrease (@RequestParam("userId") Long userId, @RequestParam("money") BigDecimal money) ; }
公共类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 @Data public class OrderDTO { private Long userId; private String commodityCode; private Integer count; private BigDecimal money; } @Data @AllArgsConstructor @NoArgsConstructor public class Result <T> { private Integer code; private String msg; private T data; public static <T> Result<T> success (T data) { return new Result <>(200 , "success" , data); } public static <T> Result<T> error (String msg) { return new Result <>(500 , msg, null ); } }
order 下单模块 依赖配置 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 <dependencies > <dependency > <groupId > com.zdemo.scloud</groupId > <artifactId > zdemo-seata-api</artifactId > <version > ${project.version}</version > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-web</artifactId > </dependency > <dependency > <groupId > com.alibaba.cloud</groupId > <artifactId > spring-cloud-starter-alibaba-nacos-discovery</artifactId > </dependency > <dependency > <groupId > com.alibaba.cloud</groupId > <artifactId > spring-cloud-starter-alibaba-seata</artifactId > </dependency > <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-starter-loadbalancer</artifactId > <optional > true</optional > </dependency > <dependency > <groupId > com.baomidou</groupId > <artifactId > mybatis-plus-spring-boot3-starter</artifactId > </dependency > <dependency > <groupId > com.mysql</groupId > <artifactId > mysql-connector-j</artifactId > <scope > runtime</scope > </dependency > </dependencies >
配置文件 application.yml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 server: port: 8103 spring: application: name: zdemo-seata-order datasource: url: jdbc:mysql://192.168.1.251:3306/zdemo_seata_order?useSSL=false&serverTimezone=Asia/Shanghai&useUnicode=true&allowMultiQueries=true&rewriteBatchedStatements=true&connectTimeout=3000&socketTimeout=60000 username: root password: 123456 driver-class-name: com.mysql.cj.jdbc.Driver type: com.zaxxer.hikari.HikariDataSource hikari: minimum-idle: 20 maximum-pool-size: 50 idle-timeout: 600000 connection-timeout: 30000 max-lifetime: 1800000 connection-test-query: SELECT 1 cloud: nacos: discovery: server-addr: 192.168 .1 .149 :8848 username: nacos password: nacos namespace: prod-zdemo cluster-name: DEFAULT mybatis-plus: configuration: map-underscore-to-camel-case: true log-impl: org.apache.ibatis.logging.stdout.StdOutImpl seata: enabled: true application-id: ${spring.application.name} tx-service-group: zdemo_tx_group service: vgroup-mapping: zdemo_tx_group: default registry: type: nacos nacos: server-addr: 192.168 .1 .149 :8848 namespace: prod-zdemo group: SEATA_GROUP username: nacos password: nacos application: seata-server data-source-proxy-mode: AT
启动类 SeataOrderApplication
1 2 3 4 5 6 7 8 @SpringBootApplication @EnableDiscoveryClient @EnableFeignClients(basePackages = "com.zdemo.scloud.api.service") public class SeataOrderApplication { public static void main (String[] args) { SpringApplication.run(SeataOrderApplication.class, args); } }
业务代码 OrderController
1 2 3 4 5 6 7 8 9 10 11 12 @RestController @RequestMapping("/order") public class OrderController { @Resource private OrderService orderService; @PostMapping("/create") public Result<Void> create (@RequestBody OrderDTO orderDto) { orderService.createOrder(orderDto); return Result.success(null ); } }
OrderService
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 import com.zdemo.scloud.api.model.dto.OrderDTO;import com.zdemo.scloud.api.service.StoreFeignClient;import com.zdemo.scloud.api.service.UserFeignClient;import com.zdemo.scloud.order.entity.Order;import com.zdemo.scloud.order.mapper.OrderMapper;import io.seata.core.context.RootContext;import io.seata.spring.annotation.GlobalTransactional;import jakarta.annotation.Resource;import lombok.extern.slf4j.Slf4j;import org.springframework.stereotype.Service;@Slf4j @Service public class OrderService { @Resource private OrderMapper orderMapper; @Resource private StoreFeignClient storeFeignClient; @Resource private UserFeignClient userFeignClient; @GlobalTransactional(name = "zdemo-create-order-tx", rollbackFor = Exception.class) public void createOrder (OrderDTO orderDto) { log.info("==========================================================================" ); log.info("🏁 触发分布式下单大闸!Seata 全局唯一事务 XID = {}" , RootContext.getXID()); Order order = new Order (); order.setUserId(orderDto.getUserId()); order.setCommodityCode(orderDto.getCommodityCode()); order.setCount(orderDto.getCount()); order.setMoney(orderDto.getMoney()); order.setStatus(0 ); orderMapper.insert(order); log.info("Step 1 ➔ 本地一阶段订单数据初步插入成功!" ); log.info("Step 2 ➔ 正在通过 OpenFeign 调用【仓储微服务】扣减库存..." ); storeFeignClient.decrease(orderDto.getCommodityCode(), orderDto.getCount()); log.info("Step 3 ➔ 正在通过 OpenFeign 调用【账户微服务】扣减余额..." ); userFeignClient.decrease(orderDto.getUserId(), orderDto.getMoney()); order.setStatus(1 ); orderMapper.updateById(order); log.info("🎉 Step 4 ➔ 全链路业务校验通关,本地订单置为完结。全局事务准备自动二阶段 Commit!" ); log.info("==========================================================================" ); } }
OrderMapper
1 2 3 4 5 6 import com.baomidou.mybatisplus.core.mapper.BaseMapper;import com.zdemo.scloud.order.entity.Order;import org.apache.ibatis.annotations.Mapper;@Mapper public interface OrderMapper extends BaseMapper <Order> {}
Order
1 2 3 4 5 6 7 8 9 10 11 @Data @TableName("t_order") public class Order { @TableId(type = IdType.AUTO) private Long id; private Long userId; private String commodityCode; private Integer count; private BigDecimal money; private Integer status; }
store 库存模块 依赖配置 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 <dependencies > <dependency > <groupId > com.zdemo.scloud</groupId > <artifactId > zdemo-seata-api</artifactId > <version > ${project.version}</version > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-web</artifactId > </dependency > <dependency > <groupId > com.alibaba.cloud</groupId > <artifactId > spring-cloud-starter-alibaba-nacos-discovery</artifactId > </dependency > <dependency > <groupId > com.alibaba.cloud</groupId > <artifactId > spring-cloud-starter-alibaba-seata</artifactId > </dependency > <dependency > <groupId > com.baomidou</groupId > <artifactId > mybatis-plus-spring-boot3-starter</artifactId > </dependency > <dependency > <groupId > com.mysql</groupId > <artifactId > mysql-connector-j</artifactId > <scope > runtime</scope > </dependency > </dependencies >
配置文件 application.yml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 server: port: 8101 spring: application: name: zdemo-seata-store datasource: url: jdbc:mysql://192.168.1.251:3306/zdemo_seata_store?useSSL=false&serverTimezone=Asia/Shanghai&useUnicode=true&allowMultiQueries=true&rewriteBatchedStatements=true&connectTimeout=3000&socketTimeout=60000 username: root password: 123456 driver-class-name: com.mysql.cj.jdbc.Driver type: com.zaxxer.hikari.HikariDataSource hikari: minimum-idle: 20 maximum-pool-size: 50 idle-timeout: 600000 connection-timeout: 30000 max-lifetime: 1800000 connection-test-query: SELECT 1 cloud: nacos: discovery: server-addr: 192.168 .1 .149 :8848 username: nacos password: nacos namespace: prod-zdemo cluster-name: DEFAULT mybatis-plus: configuration: map-underscore-to-camel-case: true log-impl: org.apache.ibatis.logging.stdout.StdOutImpl seata: enabled: true application-id: ${spring.application.name} tx-service-group: zdemo_tx_group service: vgroup-mapping: zdemo_tx_group: default registry: type: nacos nacos: server-addr: 192.168 .1 .149 :8848 namespace: prod-zdemo group: SEATA_GROUP username: nacos password: nacos application: seata-server data-source-proxy-mode: AT
启动类 1 2 3 4 5 6 7 @SpringBootApplication @EnableDiscoveryClient public class SeataStoreApplication { public static void main (String[] args) { SpringApplication.run(SeataStoreApplication.class, args); } }
业务类 StorageController
1 2 3 4 5 6 7 8 9 10 11 12 13 14 @Slf4j @RestController @RequestMapping("/storage") public class StorageController { @Resource private StorageService storageService; @PostMapping("/decrease") public Result<Void> decrease (@RequestParam("commodityCode") String commodityCode, @RequestParam("count") Integer count) { log.info("▶▶▶ [仓储服务] 收到扣减库存请求. XID: {}" , RootContext.getXID()); storageService.decrease(commodityCode, count); return Result.success(null ); } }
StorageService
1 2 3 4 5 6 7 8 9 10 11 12 13 14 @Service public class StorageService { @Resource private StorageMapper storageMapper; @Transactional(rollbackFor = Exception.class) public void decrease (String commodityCode, Integer count) { UpdateWrapper<Storage> updateWrapper = new UpdateWrapper <>(); updateWrapper.eq("commodity_code" , commodityCode) .setSql("count = count - " + count); storageMapper.update(null , updateWrapper); } }
StorageMapper
1 2 3 4 5 6 import com.baomidou.mybatisplus.core.mapper.BaseMapper;import com.zdemo.scloud.store.entity.Storage;import org.apache.ibatis.annotations.Mapper;@Mapper public interface StorageMapper extends BaseMapper <Storage> {}
Storage
1 2 3 4 5 6 7 8 @Data @TableName("t_storage") public class Storage { @TableId private Long id; private String commodityCode; private Integer count; }
user 账户模块 依赖配置 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 <dependencies > <dependency > <groupId > com.zdemo.scloud</groupId > <artifactId > zdemo-seata-api</artifactId > <version > ${project.version}</version > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-web</artifactId > </dependency > <dependency > <groupId > com.alibaba.cloud</groupId > <artifactId > spring-cloud-starter-alibaba-nacos-discovery</artifactId > </dependency > <dependency > <groupId > com.alibaba.cloud</groupId > <artifactId > spring-cloud-starter-alibaba-seata</artifactId > </dependency > <dependency > <groupId > com.baomidou</groupId > <artifactId > mybatis-plus-spring-boot3-starter</artifactId > </dependency > <dependency > <groupId > com.mysql</groupId > <artifactId > mysql-connector-j</artifactId > <scope > runtime</scope > </dependency > </dependencies >
配置文件 application.yml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 server: port: 8102 spring: application: name: zdemo-seata-user datasource: url: jdbc:mysql://192.168.1.251:3306/zdemo_seata_user?useSSL=false&serverTimezone=Asia/Shanghai&useUnicode=true&allowMultiQueries=true&rewriteBatchedStatements=true&connectTimeout=3000&socketTimeout=60000 username: root password: 123456 driver-class-name: com.mysql.cj.jdbc.Driver type: com.zaxxer.hikari.HikariDataSource hikari: minimum-idle: 20 maximum-pool-size: 50 idle-timeout: 600000 connection-timeout: 30000 max-lifetime: 1800000 connection-test-query: SELECT 1 cloud: nacos: discovery: server-addr: 192.168 .1 .149 :8848 username: nacos password: nacos namespace: prod-zdemo cluster-name: DEFAULT mybatis-plus: configuration: map-underscore-to-camel-case: true log-impl: org.apache.ibatis.logging.stdout.StdOutImpl seata: enabled: true application-id: ${spring.application.name} tx-service-group: zdemo_tx_group service: vgroup-mapping: zdemo_tx_group: default registry: type: nacos nacos: server-addr: 192.168 .1 .149 :8848 namespace: prod-zdemo group: SEATA_GROUP username: nacos password: nacos application: seata-server data-source-proxy-mode: AT
启动类 1 2 3 4 5 6 7 @SpringBootApplication @EnableDiscoveryClient public class SeataUserApplication { public static void main (String[] args) { SpringApplication.run(SeataUserApplication.class, args); } }
业务类 AccountController
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Slf4j @RestController @RequestMapping("/account") public class AccountController { @Resource private AccountService accountService; @PostMapping("/decrease") public Result<Void> decrease (@RequestParam("userId") Long userId, @RequestParam("money") BigDecimal money) { log.info("▶▶▶ [账户服务] 收到扣减余额请求. XID: {}" , RootContext.getXID()); accountService.decrease(userId, money); return Result.success(null ); } }
AccountService
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 @Slf4j @Service public class AccountService { @Resource private AccountMapper accountMapper; @Transactional public void decrease (Long userId, BigDecimal money) { if (money.compareTo(new BigDecimal ("500" )) > 0 ) { log.error("[埋点激活] 触发预设余额不足惩罚,强行抛出运行时异常引发全局回滚!" ); throw new RuntimeException ("【账户中心】账户余额严重不足,扣款失败!" ); } UpdateWrapper<Account> updateWrapper = new UpdateWrapper <>(); updateWrapper.eq("user_id" , userId) .setSql("money = money - " + money); accountMapper.update(null , updateWrapper); } }
AccountMapper
1 2 3 4 5 6 import com.baomidou.mybatisplus.core.mapper.BaseMapper;import com.zdemo.scloud.user.entity.Account;import org.apache.ibatis.annotations.Mapper;@Mapper public interface AccountMapper extends BaseMapper <Account> {}
Account
1 2 3 4 5 6 7 8 @Data @TableName("t_account") public class Account { @TableId private Long id; private Long userId; private BigDecimal money; }
业务相关的库表 订单数据库 zdemo_seata_order:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 CREATE DATABASE IF NOT EXISTS `zdemo_seata_order` DEFAULT CHARACTER SET utf8mb4;USE `zdemo_seata_order`; CREATE TABLE `t_order` ( `id` bigint (20 ) NOT NULL AUTO_INCREMENT, `user_id` bigint (20 ) DEFAULT NULL COMMENT '用户ID' , `commodity_code` varchar (255 ) DEFAULT NULL COMMENT '商品编码' , `count` int (11 ) DEFAULT '0' COMMENT '购买数量' , `money` decimal (11 ,2 ) DEFAULT '0.00' COMMENT '订单金额' , `status` int (11 ) DEFAULT '0' COMMENT '订单状态:0-创建中,1-已完结' , PRIMARY KEY (`id`) ) ENGINE= InnoDB DEFAULT CHARSET= utf8mb4; CREATE TABLE `undo_log` ( `id` bigint (20 ) NOT NULL AUTO_INCREMENT, `branch_id` bigint (20 ) NOT NULL , `xid` varchar (100 ) NOT NULL , `context` varchar (128 ) NOT NULL , `rollback_info` longblob NOT NULL , `log_status` int (11 ) NOT NULL , `log_created` datetime NOT NULL , `log_modified` datetime NOT NULL , `ext` varchar (100 ) DEFAULT NULL , PRIMARY KEY (`id`), UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`) ) ENGINE= InnoDB DEFAULT CHARSET= utf8;
商品仓储数据库 zdemo_seata_store:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 CREATE DATABASE IF NOT EXISTS `zdemo_seata_store` DEFAULT CHARACTER SET utf8mb4;USE `zdemo_seata_store`; CREATE TABLE `t_storage` ( `id` bigint (20 ) NOT NULL AUTO_INCREMENT, `commodity_code` varchar (255 ) DEFAULT NULL COMMENT '商品编码' , `count` int (11 ) DEFAULT '0' COMMENT '总库存' , PRIMARY KEY (`id`), UNIQUE KEY `commodity_code` (`commodity_code`) ) ENGINE= InnoDB DEFAULT CHARSET= utf8mb4; INSERT INTO `t_storage` (`id`, `commodity_code`, `count`) VALUES (1 , 'Owlias-1.3' , 100 );CREATE TABLE `undo_log` ( `id` bigint (20 ) NOT NULL AUTO_INCREMENT, `branch_id` bigint (20 ) NOT NULL , `xid` varchar (100 ) NOT NULL , `context` varchar (128 ) NOT NULL , `rollback_info` longblob NOT NULL , `log_status` int (11 ) NOT NULL , `log_created` datetime NOT NULL , `log_modified` datetime NOT NULL , `ext` varchar (100 ) DEFAULT NULL , PRIMARY KEY (`id`), UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`) ) ENGINE= InnoDB DEFAULT CHARSET= utf8;
用户账户数据库 zdemo_seata_user:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 CREATE DATABASE IF NOT EXISTS `zdemo_seata_user` DEFAULT CHARACTER SET utf8mb4;USE `zdemo_seata_user`; CREATE TABLE `t_account` ( `id` bigint (20 ) NOT NULL AUTO_INCREMENT, `user_id` bigint (20 ) DEFAULT NULL COMMENT '用户ID' , `money` decimal (11 ,2 ) DEFAULT '0.00' COMMENT '账户余额' , PRIMARY KEY (`id`) ) ENGINE= InnoDB DEFAULT CHARSET= utf8mb4; INSERT INTO `t_account` (`id`, `user_id`, `money`) VALUES (1 , 1 , 1000.00 );CREATE TABLE `undo_log` ( `id` bigint (20 ) NOT NULL AUTO_INCREMENT, `branch_id` bigint (20 ) NOT NULL , `xid` varchar (100 ) NOT NULL , `context` varchar (128 ) NOT NULL , `rollback_info` longblob NOT NULL , `log_status` int (11 ) NOT NULL , `log_created` datetime NOT NULL , `log_modified` datetime NOT NULL , `ext` varchar (100 ) DEFAULT NULL , PRIMARY KEY (`id`), UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`) ) ENGINE= InnoDB DEFAULT CHARSET= utf8;
关于如何查看 rollback_info 字段的说明:
1 2 3 4 select CONVERT (rollback_info USING utf8) from zdemo_seata_order.undo_log where id= xxx;select CONVERT (rollback_info USING utf8) from zdemo_seata_store.undo_log where id= xxx;
结果形如:
1 2 3 4 { "@class" : "io.seata.rm.datasource.undo.BranchUndoLog" , "xid" : "192.168.1.149:8091:5450146059598175958" , "branchId" : 5450146059598175973 , "sqlUndoLogs" : [ "java.util.ArrayList" , [ { "@class" : "io.seata.rm.datasource.undo.SQLUndoLog" , "sqlType" : "INSERT" , "tableName" : "t_order" , "beforeImage" : { "@class" : "io.seata.rm.datasource.sql.struct.TableRecords$EmptyTableRecords" , "tableName" : "t_order" , "rows" : [ "java.util.ArrayList" , [ ] ] } , "afterImage" : { "@class" : "io.seata.rm.datasource.sql.struct.TableRecords" , "tableName" : "t_order" , "rows" : [ "java.util.ArrayList" , [ { "@class" : "io.seata.rm.datasource.sql.struct.Row" , "fields" : [ "java.util.ArrayList" , [ { "@class" : "io.seata.rm.datasource.sql.struct.Field" , "name" : "id" , "keyType" : "PRIMARY_KEY" , "type" : -5 , "value" : [ "java.lang.Long" , 23 ] } , { "@class" : "io.seata.rm.datasource.sql.struct.Field" , "name" : "user_id" , "keyType" : "NULL" , "type" : -5 , "value" : [ "java.lang.Long" , 1 ] } , { "@class" : "io.seata.rm.datasource.sql.struct.Field" , "name" : "commodity_code" , "keyType" : "NULL" , "type" : 12 , "value" : "Owlias-1.3" } , { "@class" : "io.seata.rm.datasource.sql.struct.Field" , "name" : "count" , "keyType" : "NULL" , "type" : 4 , "value" : 20 } , { "@class" : "io.seata.rm.datasource.sql.struct.Field" , "name" : "money" , "keyType" : "NULL" , "type" : 3 , "value" : [ "java.math.BigDecimal" , 2000.00 ] } , { "@class" : "io.seata.rm.datasource.sql.struct.Field" , "name" : "status" , "keyType" : "NULL" , "type" : 4 , "value" : 0 } ] ] } ] ] } } ] ] } { "@class" : "io.seata.rm.datasource.undo.BranchUndoLog" , "xid" : "192.168.1.149:8091:5450146059598176633" , "branchId" : 5450146059598176650 , "sqlUndoLogs" : [ "java.util.ArrayList" , [ { "@class" : "io.seata.rm.datasource.undo.SQLUndoLog" , "sqlType" : "UPDATE" , "tableName" : "t_storage" , "beforeImage" : { "@class" : "io.seata.rm.datasource.sql.struct.TableRecords" , "tableName" : "t_storage" , "rows" : [ "java.util.ArrayList" , [ { "@class" : "io.seata.rm.datasource.sql.struct.Row" , "fields" : [ "java.util.ArrayList" , [ { "@class" : "io.seata.rm.datasource.sql.struct.Field" , "name" : "id" , "keyType" : "PRIMARY_KEY" , "type" : -5 , "value" : [ "java.lang.Long" , 1 ] } , { "@class" : "io.seata.rm.datasource.sql.struct.Field" , "name" : "count" , "keyType" : "NULL" , "type" : 4 , "value" : 94 } ] ] } ] ] } , "afterImage" : { "@class" : "io.seata.rm.datasource.sql.struct.TableRecords" , "tableName" : "t_storage" , "rows" : [ "java.util.ArrayList" , [ { "@class" : "io.seata.rm.datasource.sql.struct.Row" , "fields" : [ "java.util.ArrayList" , [ { "@class" : "io.seata.rm.datasource.sql.struct.Field" , "name" : "id" , "keyType" : "PRIMARY_KEY" , "type" : -5 , "value" : [ "java.lang.Long" , 1 ] } , { "@class" : "io.seata.rm.datasource.sql.struct.Field" , "name" : "count" , "keyType" : "NULL" , "type" : 4 , "value" : 92 } ] ] } ] ] } } ] ] }
搭建 seata 服务端 项目中需要安装 Seata 服务端,也就是 TC 事务协调器。只在微服务里引入 seata-spring-boot-starter 只是让你的代码变成了 TM(事务管理器) 和 RM(资源管理器)。如果没有独立运行的 TC(Seata 服务端) 在后台坐镇,整个分布式事务在运行时连最基本的 “花名册注册 ”和 “全局锁发放 ”都无法完成。
在本地测试或沙盒环境里,大家喜欢用 File 模式(无脑存本地文件,单机运行)。但在真实的生产环境中,为了应对高并发、高可用(HA)以及容灾,通常需要:
TC 必须集群化(Clustering) :生产线至少部署 2~3 台 Seata-Server 节点组成集群,挂载到同一个 Nacos 注册中心上,实现无状态的负载均衡与故障转移。
存储模式必须是 DB 或 Redis 模式 :绝对禁止 File 模式!多台 Seata-Server 必须共享同一个外部数据库(zdemo_seata_tc)或者高性能 Redis 哨兵/集群,用来共享保存全局事务状态和全局行锁。
配置统一交由 Nacos Config 管辖 :Seata 的所有动态调优参数(如锁重试次数、心跳间隔)全部人肉持久化在 Nacos 配置中心,绝不写死在本地。
这里我们 seata 数据库采用 mysql 来实现,配置文件简单起见先使用本地的,seaa服务注册到 nacos,方便 TC 客户端的调用。
准备 TC 库表 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 CREATE DATABASE IF NOT EXISTS `zdemo_seata_tc` DEFAULT CHARACTER SET utf8mb4;USE `zdemo_seata_tc`; CREATE TABLE IF NOT EXISTS `global_table` ( `xid` VARCHAR (128 ) NOT NULL , `transaction_id` BIGINT NOT NULL , `status` TINYINT NOT NULL , `application_id` VARCHAR (32 ) DEFAULT NULL , `transaction_service_group` VARCHAR (32 ) DEFAULT NULL , `transaction_name` VARCHAR (128 ) DEFAULT NULL , `timeout` INT DEFAULT NULL , `begin_time` BIGINT DEFAULT NULL , `application_data` VARCHAR (2000 ) DEFAULT NULL , `gmt_create` DATETIME DEFAULT NULL , `gmt_modified` DATETIME DEFAULT NULL , PRIMARY KEY (`xid`), KEY `idx_gmt_modified_status` (`gmt_modified`, `status`), KEY `idx_transaction_id` (`transaction_id`) ) ENGINE= InnoDB DEFAULT CHARSET= utf8mb4; CREATE TABLE IF NOT EXISTS `branch_table` ( `branch_id` BIGINT NOT NULL , `xid` VARCHAR (128 ) NOT NULL , `transaction_id` BIGINT DEFAULT NULL , `resource_group_id` VARCHAR (32 ) DEFAULT NULL , `resource_id` VARCHAR (512 ) DEFAULT NULL , `branch_type` VARCHAR (8 ) DEFAULT NULL , `status` TINYINT DEFAULT NULL , `client_id` VARCHAR (64 ) DEFAULT NULL , `application_data` VARCHAR (2000 ) DEFAULT NULL , `gmt_create` DATETIME DEFAULT NULL , `gmt_modified` DATETIME DEFAULT NULL , PRIMARY KEY (`branch_id`), KEY `idx_xid` (`xid`) ) ENGINE= InnoDB DEFAULT CHARSET= utf8mb4; CREATE TABLE IF NOT EXISTS `lock_table` ( `row_key` VARCHAR (128 ) NOT NULL , `xid` VARCHAR (128 ) DEFAULT NULL , `transaction_id` BIGINT DEFAULT NULL , `branch_id` BIGINT DEFAULT NULL , `resource_id` VARCHAR (512 ) DEFAULT NULL , `table_name` VARCHAR (32 ) DEFAULT NULL , `pk` VARCHAR (36 ) DEFAULT NULL , `status` TINYINT NOT NULL DEFAULT '0' COMMENT '0:locked ,1:rollbacking' , `gmt_create` DATETIME DEFAULT NULL , `gmt_modified` DATETIME DEFAULT NULL , PRIMARY KEY (`row_key`), KEY `idx_status` (`status`), KEY `idx_branch_id` (`branch_id`), KEY `idx_xid` (`xid`) ) ENGINE= InnoDB DEFAULT CHARSET= utf8mb4; CREATE TABLE IF NOT EXISTS `distributed_lock`( `lock_key` VARCHAR (128 ) NOT NULL , `lock_value` VARCHAR (128 ) NOT NULL , `expire` BIGINT NOT NULL , PRIMARY KEY (`lock_key`) ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4; INSERT IGNORE INTO `distributed_lock` (`lock_key`, `lock_value`, `expire`) VALUES ('AsyncCommitting' , ' ' , 0 ), ('RetryCommitting' , ' ' , 0 ), ('RetryRollbacking' , ' ' , 0 ), ('TxTimeoutCheck' , ' ' , 0 );
安装 Seata-Server 前往 Seata 官方下载 页面,下载适配的二进制压缩包(例如 seata-server-2.0.0.tar.gz),并解压到你的物理服务器(或 CentOS VM)中:
1 2 $ tar -zxvf seata-server-2.0.0.tar.gz -C /usr/local/ $ cd /usr/local/seata/conf
注意:在官方给出的 2.0.0 版本中,lib 路径下 jdbc 有多个版本,需要根据使用的 mysql 数据库版本选择并删掉哪些不用的版本,不然会不断有 Caused by: java.sql.SQLException: Unknown system variable ‘query_cache_size’ 的报错。
修改 Seata 服务端核心配置文件:在 conf 目录下,2.x 版本已经极简化。我们直接编辑 application.yml(或者 application.properties),把它的运行模式由默认的单机文件版人肉魔改为生产级的 Nacos 发现 + DB 存储模式(可以参考官方在同一文件夹中的示例文件 application.example.yml,在此基础上修改),我的配置如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 server: port: 7091 spring: application: name: seata-server logging: config: classpath:logback-spring.xml file: path: ${log.home:${user.home}/logs/seata} console: user: username: seata password: seata seata: security: secretKey: SeataSecretKey0c382ef121d778043159209298fd40bf3850a017 tokenValidityInMilliseconds: 1800000 ignore: urls: /,/**/*.css,/**/*.js,/**/*.html,/**/*.map,/**/*.svg,/**/*.png,/**/*.jpeg,/**/*.ico,/api/v1/auth/login,/metadata/v1/** config: type: file registry: type: nacos preferred-networks: 192.168 .* nacos: application: seata-server server-addr: 192.168 .1 .149 :8848 group: SEATA_GROUP namespace: prod-zdemo cluster: default username: nacos password: nacos store: mode: db session: mode: db lock: mode: db file: dir: sessionStore max-branch-session-size: 16384 max-global-session-size: 512 file-write-buffer-cache-size: 16384 session-reload-read-size: 100 flush-disk-mode: async db: datasource: druid db-type: mysql driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://192.168.1.251:3306/zdemo_seata_tc?useSSL=false&serverTimezone=Asia/Shanghai user: root password: 123456 min-conn: 10 max-conn: 100 global-table: global_table branch-table: branch_table lock-table: lock_table distributed-lock-table: distributed_lock query-limit: 1000 max-wait: 5000 metrics: enabled: false registry-type: compact exporter-list: prometheus exporter-prometheus-port: 9898
启动并查看日志:
1 2 $ /usr/local/seata/bin/seata-server.sh -h 192.168.1.149 -p 8091 -m db $ tail -f /root/logs/seata/seata-server.8091.all.log
前端控制台页面(用户名密码 seata/seata):
1 $ curl http://192.168.1.149:7091
XA 模式的实现案例 在微服务分布式事务中,XA 模式是一种强一致性(Strong Consistency)的解决方案。它与上面 AT 模式的不同点在于:
如果你现在的场景是银行转账、涉及金钱审计等对账严格、不允许半点中间状态暴露的业务,则建议使用 XA 模式;如果是高并发的电商下单、商品扣库存,建议保留原有的 AT 模式。
在 Spring Cloud 2023 + Seata 2.0 架构中,将默认的 AT 模式切换为 XA 模式非常简单,核心逻辑在于更换数据源代理类型。它的底层依赖于数据库本身对 XA 规范(两阶段提交,2PC)的原生支持(如 MySQL 5.7+ 默认支持)。XA 模式的两阶段提交运转逻辑:
第一阶段(Prepare): 业务微服务(RM)执行业务 SQL,但不提交事务。Seata 代理数据源会向数据库发送 XA START、XA END 和 XA PREPARE。此时数据库资源(如行锁)会被真正锁住。
第二阶段(Commit/Rollback): 事务管理器(TM)根据所有微服务的执行结果,通知 Seata TC。TC 如果下发 Commit,各个数据库执行 XA COMMIT 真正落盘并释放锁;如果任何一个服务失败,TC 下发 Rollback,各个数据库执行 XA ROLLBACK 回滚。
由于我们使用的是 Seata 2.0.0,官方已经极大地简化了模式切换。你只需要在订单微服务(Order)和 库存微服务(Store)以及账户微服务(User)中做以下两步调整:
第一步:在 application.yml 中修改数据源代理模式
1 2 seata: data-source-proxy-mode: XA
第二步:确保数据库驱动支持 XA 。因为 XA 模式是由数据库驱动直接与 MySQL 通信完成的,这就要求我们前面大换血时引入的 com.mysql.cj.jdbc.Driver 必须到位。同时,确保微服务连接的 MySQL 账号拥有 XA 事务的权限:
1 2 3 GRANT XA_RECOVER_ADMIN ON * .* TO '你的数据库用户名' @'%' ;FLUSH PRIVILEGES;
XA 模式对业务代码是完全无侵入的。你在 zdemo-seata-order 的 Service 方法上原有的 @GlobalTransactional 注解不需要做任何修改:
1 2 3 4 @GlobalTransactional(name = "zdemo-create-order-tx", rollbackFor = Exception.class) public void createOrder (OrderDTO orderDto) { }
TCC 模式的实现案例 代码注意事项 在分布式事务中,TCC 模式 (Try-Confirm-Cancel) 属于典型的业务层两阶段提交。它与 AT、XA 模式最大的不同在于:Seata 框架不再帮你自动代理数据源、不再帮你自动生成回滚日志,而是把事务的控制权完全交给了你的业务代码。正因为它的自由度极高,所以在实际编码时,需要考虑几个极其致命的 “隐形陷阱”。
业务悬挂(Anti-Suspension)
现象:某一微服务的 Try 请求因为网络拥堵严重超时,分布式事务管理器(TM)以为该节点失败了,于是对整个事务下发了 Cancel 指令。结果 Cancel 执行完后,那个迟到的 Try 请求才真正到达服务器。
后果:此时如果没有防御,Try 方法会再次执行,成功锁住一笔业务资源。但由于 Cancel 已经走完了,这笔资源将永远无法被释放(发生内存/资源泄露)。
解法:在执行 Try 之前,必须先去交易记录表检查,当前全局事务 ID(XID)是否已经执行过 Cancel 或 Confirm。如果是则直接拦截并报错,绝不执行 Try。
空回滚(Null-Cancel)
现象:分布式事务在执行 Try 的时候,因为网络抖动或者服务刚启动直接崩溃,导致 Try 方法压根就没有被调用成功。这时候全局事务失败,TC 会顺理成章地下发 Cancel 指令。
后果:Cancel 方法此时被调用了,但它发现本地啥都没有处理过。如果你在 Cancel 里写的是 update account set balance = balance + 100(无脑加回),就会平白无故多给用户退了 100 块钱。
解法:Cancel 接口被触发时,首先要检查 Try 阶段的业务日志/事务记录是否存在。如果不存在,说明 Try 根本没成功,此时 Cancel 应当什么都不做,直接返回成功(即空回滚)。
幂等性(Idempotence)
现象:网络超时触发了 RPC 框架的自动重试,或者 Seata TC 发送的 Confirm / Cancel 请求由于网络丢包,在未收到 ACK 的情况下重复发送。
后果:如果 Confirm 连续扣了两次钱,或者 Cancel 连续加了两次库存,业务数据直接崩盘。
解法设计一张事务控制表(内含 xid, branch_id, state)。利用数据库的唯一索引(Unique Key)或者 state 的 CAS 状态机更新(例如:update tcc_log set state = ‘CONFIRMED’ where xid = ? and state = ‘TRY’)。只有更新成功的那个请求才能真正执行二阶段的业务。
数据并发隔离与资产保护(Data Isolation)
现象:在 AT 或 XA 模式下,有数据库行锁或 Seata 全局锁帮你把资源死死扣住,外界无法修改。但 TCC 是一阶段直接把本地事务提交了。
后果:比如用户的账户余额有 100 元,Try 阶段检查通过,准备买 80 元的东西,但 Try 并没有真正扣钱,只是在内存或一个冻结表里记录了一下。在 Confirm 还没下发前,用户通过另一个普通线程迅速提现了 50 元(此时余额变 50)。等 Confirm 真正下发执行扣减 80 元时,账户直接变成了 -30 元(发生业务资损、负资产)。
解法:TCC 的 Try 阶段绝对不能仅仅做 SELECT 校验,必须引入“冻结资金”或“预留库存”的物理概念。
不要直接在 Try 里看 balance > 80。
而是应该在主表里 balance = balance - 80, freeze_balance = freeze_balance + 80,或者引入一张单独的 freeze_record 表,把资源在 Try 阶段通过数据库行锁硬锁住。
性能与脏数据清理(异步与降级机制)
现象:如果分布式事务的二阶段(Confirm 或 Cancel)因为目标数据库宕机长期失败,Seata 会按照指数退避算法不断发起重试。
后果:如果重试了 100 次数据库还没好,这笔被“预留”或“冻结”的业务资源(比如某张电影票、某笔资金)就会被无限期挂起,直接影响后续正常用户的购买。
解法:超时自动解锁,冻结表或预留表里必须带有 expire_time(过期时间字段);或者编写异步的监控脚本,当发现某笔 TCC 事务处于 TRY 状态超过 1 小时,且 TC 没有下文时,自动触发报警或者根据业务策略强行进行本地 Cancel 冲正。
具体实现案例 将上述 “下单-扣库存-扣余额” 案例改造为 TCC 模式,我们需要对代码结构进行重构。为了完美防御空回滚、幂等性、业务悬挂,并做到数据并发隔离,我们需要:
引入 TCC 专属的控制日志表(或利用状态机):用来人肉记录每一个分支事务的执行状态。
重构业务逻辑做资源预留:
库存服务 :不再无脑扣减 count,而是扣减 count 的同时增加 freeze_count(冻结库存)。
账户服务 :不再无脑扣减 money,而是扣减 money 的同时增加 freeze_money(冻结余额)。
为了防范异常,我们需要一张 分布式事务控制表 (也可以两边服务各建一张)。
数据库表的改动 1 2 3 4 5 6 7 8 9 10 11 12 13 ALTER TABLE zdemo_seata_store.t_storage ADD COLUMN `freeze_count` int (11 ) NOT NULL DEFAULT '0' COMMENT '冻结库存' AFTER `count`;ALTER TABLE zdemo_seata_user.t_account ADD COLUMN `freeze_money` decimal (14 ,2 ) NOT NULL DEFAULT '0.00' COMMENT '冻结金额' AFTER `money`;CREATE TABLE IF NOT EXISTS `tcc_transaction_log` ( `xid` VARCHAR (128 ) NOT NULL COMMENT '全局事务XID' , `branch_id` BIGINT NOT NULL COMMENT '分支事务ID' , `state` INT NOT NULL COMMENT '状态: 1-TRY, 2-CONFIRM, 3-CANCEL' , PRIMARY KEY (`xid`, `branch_id`) ) ENGINE= InnoDB DEFAULT CHARSET= utf8mb4;
接口层设计 TCC 必须定义在 Interface 上。Seata 要求 TCC 的二阶段方法必须通过 @LocalTCC 或在 Feign 接口上通过 @TwoPhaseBusinessAction 进行声明。
库存服务 TCC 接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 package com.zdemo.scloud.store.service;import io.seata.rm.tcc.api.BusinessActionContext;import io.seata.rm.tcc.api.BusinessActionContextParameter;import io.seata.rm.tcc.api.TwoPhaseBusinessAction;public interface StorageTccService { @TwoPhaseBusinessAction(name = "storageTccAction", commitMethod = "confirm", rollbackMethod = "cancel") boolean prepareDecrease (BusinessActionContext actionContext, @BusinessActionContextParameter(paramName = "commodityCode") String commodityCode, @BusinessActionContextParameter(paramName = "count") Integer count) ; boolean confirm (BusinessActionContext actionContext) ; boolean cancel (BusinessActionContext actionContext) ; }
账户服务 TCC 接口:
1 2 3 4 5 6 7 8 9 10 11 public interface AccountTccService { @TwoPhaseBusinessAction(name = "accountTccAction", commitMethod = "confirm", rollbackMethod = "cancel") boolean prepareDecrease (BusinessActionContext actionContext, @BusinessActionContextParameter(paramName = "userId") Long userId, @BusinessActionContextParameter(paramName = "money") BigDecimal money) ; boolean confirm (BusinessActionContext actionContext) ; boolean cancel (BusinessActionContext actionContext) ; }
库存接口实现层 StorageTccServiceImpl
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 package com.zdemo.scloud.store.service;import com.zdemo.scloud.store.entity.TccTransactionLog;import com.zdemo.scloud.store.mapper.StorageTccMapper;import com.zdemo.scloud.store.mapper.TccTransactionLogMapper;import io.seata.core.context.RootContext;import io.seata.rm.tcc.api.BusinessActionContext;import io.seata.rm.tcc.api.LocalTCC;import jakarta.annotation.Resource;import lombok.extern.slf4j.Slf4j;import org.springframework.dao.DuplicateKeyException;import org.springframework.stereotype.Service;import org.springframework.transaction.annotation.Transactional;@Slf4j @Service @LocalTCC public class StorageTccServiceImpl implements StorageTccService { @Resource private StorageTccMapper storageTccMapper; @Resource private TccTransactionLogMapper tccTransactionLogMapper; @Override @Transactional(rollbackFor = Exception.class) public boolean prepareDecrease (BusinessActionContext actionContext, String commodityCode, Integer count) { String xid = actionContext != null ? actionContext.getXid() : RootContext.getXID(); Long branchId = actionContext != null ? actionContext.getBranchId() : null ; log.info("[Storage TCC - Try] 开始锁定库存. XID: {}, BranchId: {}" , xid, branchId); TccTransactionLog logExist = tccTransactionLogMapper.selectByXidAndBranch(xid, branchId); if (logExist != null && logExist.getState() == 3 ) { throw new RuntimeException ("[Storage TCC - Try] 检测到当前分支事务已发生 Cancel 悬挂,强行终止 Try 动作!" ); } int affectedRows = storageTccMapper.freezeStorage(commodityCode, count); if (affectedRows == 0 ) { throw new RuntimeException ("【仓储中心】库存不足,锁定失败!" ); } TccTransactionLog tccLog = new TccTransactionLog (); tccLog.setXid(xid); tccLog.setBranchId(branchId); tccLog.setState(1 ); tccTransactionLogMapper.insert(tccLog); if (actionContext != null ) { actionContext.addActionContext("commodityCode" , commodityCode); actionContext.addActionContext("count" , count); } return true ; } @Override @Transactional(rollbackFor = Exception.class) public boolean confirm (BusinessActionContext actionContext) { String xid = actionContext != null ? actionContext.getXid() : RootContext.getXID(); Long branchId = actionContext != null ? actionContext.getBranchId() : null ; Object commodityCodeObj = actionContext != null ? actionContext.getActionContext("commodityCode" ) : null ; Object countObj = actionContext != null ? actionContext.getActionContext("count" ) : null ; if (commodityCodeObj == null || countObj == null ) { log.warn("[Storage TCC - Confirm] 上下文参数缺失" ); return false ; } TccTransactionLog logExist = tccTransactionLogMapper.selectByXidAndBranch(xid, branchId); if (logExist != null && logExist.getState() == 2 ) { log.info("[Storage TCC - Confirm] 分支已确认过,幂等放行。" ); return true ; } String commodityCode = commodityCodeObj.toString(); Integer count = Integer.parseInt(countObj.toString()); int updatedRows = tccTransactionLogMapper.updateStateWithCas(xid, branchId, 2 , 1 ); if (updatedRows > 0 ) { storageTccMapper.deductFreezeStorage(commodityCode, count); log.info("[Storage TCC - Confirm] 扣减冻结库存成功,事务完结落盘。" ); return true ; } else { TccTransactionLog doubleCheck = tccTransactionLogMapper.selectByXidAndBranch(xid, branchId); if (doubleCheck != null && doubleCheck.getState() == 2 ) { log.info("[Storage TCC - Confirm] 并发占坑失败,但由于最终状态已为 CONFIRM,允许幂等放行。" ); return true ; } log.error("[Storage TCC - Confirm] 状态扣减失败,且未见成功痕迹,要求 TC 触发重试!" ); return false ; } } @Override @Transactional(rollbackFor = Exception.class) public boolean cancel (BusinessActionContext actionContext) { String xid = actionContext != null ? actionContext.getXid() : RootContext.getXID(); Long branchId = actionContext != null ? actionContext.getBranchId() : null ; Object commodityCodeObj = actionContext != null ? actionContext.getActionContext("commodityCode" ) : null ; Object countObj = actionContext != null ? actionContext.getActionContext("count" ) : null ; if (commodityCodeObj == null || countObj == null ) { log.warn("[Storage TCC - Cancel] 上下文参数缺失,极可能一阶段在数据序列化前已超时,触发空回滚兜底。" ); } TccTransactionLog logExist = tccTransactionLogMapper.selectByXidAndBranch(xid, branchId); if (logExist == null ) { try { TccTransactionLog tccLog = new TccTransactionLog (xid, branchId, 3 ); tccTransactionLogMapper.insert(tccLog); log.info("[Storage TCC - Cancel] 空回滚防御成功,成功插入 CANCEL 占位记录。" ); } catch (DuplicateKeyException e) { log.warn("[Storage TCC - Cancel] 并发空回滚 insert 冲突,触发底层锁幂等防线,直接放行。" ); } return true ; } if (logExist.getState() == 3 || logExist.getState() == 2 ) { log.info("[Storage TCC - Cancel] 检测到当前分支事务已处理过(state={}),直接幂等放行。" , logExist.getState()); return true ; } if (logExist.getState() == 1 ) { if (commodityCodeObj == null || countObj == null ) { log.error("[Storage TCC - Cancel] 致命级资损隐患:本地有 Try 记录,但分布式上下文丢失参数,库存无法安全解冻!" ); throw new RuntimeException ("TCC 异步参数丢失,拒绝空回滚,等待 TC 重试或人工介入!" ); } String commodityCode = commodityCodeObj.toString(); Integer count = Integer.parseInt(countObj.toString()); int updatedRows = tccTransactionLogMapper.updateStateWithCas(xid, branchId, 3 , 1 ); if (updatedRows > 0 ) { storageTccMapper.unfreezeStorage(commodityCode, count); log.info("[Storage TCC - Cancel] 真正反向解冻资产成功,分布式事务安全回滚。" ); return true ; } else { TccTransactionLog doubleCheck = tccTransactionLogMapper.selectByXidAndBranch(xid, branchId); if (doubleCheck != null && doubleCheck.getState() == 3 ) { log.info("[Storage TCC - Cancel] 并发逆操作冲突,但当前状态已被成功置为 CANCEL,直接放行。" ); return true ; } log.error("[Storage TCC - Cancel] CAS 回滚占坑失败,要求 TC 重新下发 Cancel 重试!" ); return false ; } } log.error("[Storage TCC - Cancel] 遇到了未知的事务控制表状态: {}" , logExist.getState()); return false ; } }
StorageTccMapper
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Mapper public interface StorageTccMapper { @Update("UPDATE t_storage SET count = count - #{count}, freeze_count = freeze_count + #{count} " + "WHERE commodity_code = #{commodityCode} AND count >= #{count}") int freezeStorage (@Param("commodityCode") String commodityCode, @Param("count") Integer count) ; @Update("UPDATE t_storage SET freeze_count = freeze_count - #{count} " + "WHERE commodity_code = #{commodityCode} AND freeze_count >= #{count}") int deductFreezeStorage (@Param("commodityCode") String commodityCode, @Param("count") Integer count) ; @Update("UPDATE t_storage SET count = count + #{count}, freeze_count = freeze_count - #{count} " + "WHERE commodity_code = #{commodityCode} AND freeze_count >= #{count}") int unfreezeStorage (@Param("commodityCode") String commodityCode, @Param("count") Integer count) ; }
TccTransactionLogMapper
1 2 3 4 5 6 7 8 9 10 11 12 @Mapper public interface TccTransactionLogMapper { @Select("SELECT xid, branch_id as branchId, state FROM tcc_transaction_log WHERE xid = #{xid} AND branch_id = #{branchId}") TccTransactionLog selectByXidAndBranch (@Param("xid") String xid, @Param("branchId") Long branchId) ; @Insert("INSERT INTO tcc_transaction_log(xid, branch_id, state) VALUES(#{xid}, #{branchId}, #{state})") int insert (TccTransactionLog log) ; @Update("UPDATE tcc_transaction_log SET state = #{targetState} WHERE xid = #{xid} AND branch_id = #{branchId} AND state = #{oldState}") int updateStateWithCas (@Param("xid") String xid, @Param("branchId") Long branchId, @Param("targetState") Integer targetState, @Param("oldState") Integer oldState) ; }
TccTransactionLog
1 2 3 4 5 6 7 8 9 10 11 12 13 14 package com.zdemo.scloud.store.entity;import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;@Data @NoArgsConstructor @AllArgsConstructor public class TccTransactionLog { private String xid; private Long branchId; private Integer state; }
账户接口实现层 AccountTccServiceImpl
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 package com.zdemo.scloud.user.service;import com.zdemo.scloud.user.entity.TccTransactionLog;import com.zdemo.scloud.user.mapper.AccountTccMapper;import com.zdemo.scloud.user.mapper.TccTransactionLogMapper;import io.seata.core.context.RootContext;import io.seata.rm.tcc.api.BusinessActionContext;import io.seata.rm.tcc.api.LocalTCC;import jakarta.annotation.Resource;import lombok.extern.slf4j.Slf4j;import org.springframework.dao.DuplicateKeyException;import org.springframework.stereotype.Service;import org.springframework.transaction.annotation.Transactional;import java.math.BigDecimal;@Slf4j @Service @LocalTCC public class AccountTccServiceImpl implements AccountTccService { @Resource private AccountTccMapper accountTccMapper; @Resource private TccTransactionLogMapper tccTransactionLogMapper; @Override @Transactional(rollbackFor = Exception.class) public boolean prepareDecrease (BusinessActionContext actionContext, Long userId, BigDecimal money) { String xid = actionContext != null ? actionContext.getXid() : RootContext.getXID(); Long branchId = actionContext != null ? actionContext.getBranchId() : null ; log.info("[Account TCC - Try] 开始预留资金. XID: {}, BranchId: {}, Money: {}" , xid, branchId, money); TccTransactionLog logExist = tccTransactionLogMapper.selectByXidAndBranch(xid, branchId); if (logExist != null && logExist.getState() == 3 ) { throw new RuntimeException ("[Account TCC - Try] 检测到当前分支事务已发生 Cancel 悬挂,强行终止 Try 动作!" ); } int affectedRows = accountTccMapper.freezeMoney(userId, money); if (affectedRows == 0 ) { throw new RuntimeException ("【账户中心】余额不足或账户异常,冻结资金失败!" ); } TccTransactionLog tccLog = new TccTransactionLog (); tccLog.setXid(xid); tccLog.setBranchId(branchId); tccLog.setState(1 ); tccTransactionLogMapper.insert(tccLog); if (actionContext != null ) { actionContext.addActionContext("userId" , userId); actionContext.addActionContext("money" , money); } return true ; } @Override @Transactional(rollbackFor = Exception.class) public boolean confirm (BusinessActionContext actionContext) { String xid = actionContext != null ? actionContext.getXid() : RootContext.getXID(); Long branchId = actionContext != null ? actionContext.getBranchId() : null ; Object userIdObj = actionContext != null ? actionContext.getActionContext("userId" ) : null ; Object moneyObj = actionContext != null ? actionContext.getActionContext("money" ) : null ; if (userIdObj == null || moneyObj == null ) { log.warn("[Account TCC - Confirm] 上下文参数缺失" ); return false ; } TccTransactionLog logExist = tccTransactionLogMapper.selectByXidAndBranch(xid, branchId); if (logExist != null && logExist.getState() == 2 ) { log.info("[Account TCC - Confirm] 该分支已提交,幂等放行。" ); return true ; } Long userId = Long.valueOf(userIdObj.toString()); BigDecimal money = new BigDecimal (moneyObj.toString()); int updatedRows = tccTransactionLogMapper.updateStateWithCas(xid, branchId, 2 , 1 ); if (updatedRows > 0 ) { accountTccMapper.deductFreezeMoney(userId, money); log.info("[Account TCC - Confirm] 真正扣款落盘成功,事务完结。" ); return true ; } else { TccTransactionLog doubleCheck = tccTransactionLogMapper.selectByXidAndBranch(xid, branchId); if (doubleCheck != null && doubleCheck.getState() == 2 ) { log.info("[Account TCC - Confirm] 并发占坑失败,但由于最终状态已为 CONFIRM,允许幂等放行。" ); return true ; } log.error("[Account TCC - Confirm] 状态占坑失败,且未见成功痕迹,要求 TC 触发重试!" ); return false ; } } @Override @Transactional(rollbackFor = Exception.class) public boolean cancel (BusinessActionContext actionContext) { String xid = actionContext != null ? actionContext.getXid() : RootContext.getXID(); Long branchId = actionContext != null ? actionContext.getBranchId() : null ; Object userIdObj = actionContext != null ? actionContext.getActionContext("userId" ) : null ; Object moneyObj = actionContext != null ? actionContext.getActionContext("money" ) : null ; if (userIdObj == null || moneyObj == null ) { log.warn("[Account TCC - Cancel] 上下文参数缺失,极可能一阶段在数据序列化前已超时,触发空回滚兜底。" ); } TccTransactionLog logExist = tccTransactionLogMapper.selectByXidAndBranch(xid, branchId); if (logExist == null ) { try { TccTransactionLog tccLog = new TccTransactionLog (xid, branchId, 3 ); tccTransactionLogMapper.insert(tccLog); log.info("[Account TCC - Cancel] 空回滚防御成功,成功向控制表打入 CANCEL 占位桩。" ); } catch (DuplicateKeyException e) { log.warn("[Account TCC - Cancel] 并发空回滚发生主键冲突,底层锁幂等放行成功。" ); } return true ; } if (logExist.getState() == 3 || logExist.getState() == 2 ) { log.info("[Account TCC - Cancel] 该分支状态为 {},无需重复回滚,幂等放行。" , logExist.getState()); return true ; } if (logExist.getState() == 1 ) { if (userIdObj == null || moneyObj == null ) { log.error("[Account TCC - Cancel] 致命级资损隐患:本地有 Try 记录,但分布式上下文丢失参数,资产无法安全解冻!" ); throw new RuntimeException ("TCC 异步参数丢失,拒绝空回滚,等待 TC 重试或人工介入!" ); } Long userId = Long.valueOf(userIdObj.toString()); BigDecimal money = new BigDecimal (moneyObj.toString()); int updatedRows = tccTransactionLogMapper.updateStateWithCas(xid, branchId, 3 , 1 ); if (updatedRows > 0 ) { accountTccMapper.unfreezeMoney(userId, money); log.info("[Account TCC - Cancel] 真正反向解冻资金成功,分布式事务安全回滚。" ); return true ; } else { TccTransactionLog doubleCheck = tccTransactionLogMapper.selectByXidAndBranch(xid, branchId); if (doubleCheck != null && doubleCheck.getState() == 3 ) { log.info("[Account TCC - Cancel] 并发逆操作冲突,但当前状态已被成功置为 CANCEL,直接放行。" ); return true ; } log.error("[Account TCC - Cancel] CAS 回滚占坑失败,要求 TC 重新下发 Cancel 重试!" ); return false ; } } log.error("[Account TCC - Cancel] 遇到了未知的事务控制表状态: {}" , logExist.getState()); return false ; } }
AccountTccMapper
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Mapper public interface AccountTccMapper { @Update("UPDATE t_account SET money = money - #{money}, freeze_money = freeze_money + #{money} " + "WHERE user_id = #{userId} AND money >= #{money}") int freezeMoney (@Param("userId") Long userId, @Param("money") BigDecimal money) ; @Update("UPDATE t_account SET freeze_money = freeze_money - #{money} " + "WHERE user_id = #{userId} AND freeze_money >= #{money}") int deductFreezeMoney (@Param("userId") Long userId, @Param("money") BigDecimal money) ; @Update("UPDATE t_account SET money = money + #{money}, freeze_money = freeze_money - #{money} " + "WHERE user_id = #{userId} AND freeze_money >= #{money}") int unfreezeMoney (@Param("userId") Long userId, @Param("money") BigDecimal money) ; int updateStateWithCas (@Param("xid") String xid, @Param("branchId") Long branchId, @Param("targetState") Integer targetState, @Param("oldState") Integer oldState) ;}
TccTransactionLogMapper
1 2 3 4 5 6 7 8 9 10 11 12 @Mapper public interface TccTransactionLogMapper { @Select("SELECT xid, branch_id as branchId, state FROM tcc_transaction_log WHERE xid = #{xid} AND branch_id = #{branchId}") TccTransactionLog selectByXidAndBranch (@Param("xid") String xid, @Param("branchId") Long branchId) ; @Insert("INSERT INTO tcc_transaction_log(xid, branch_id, state) VALUES(#{xid}, #{branchId}, #{state})") int insert (TccTransactionLog log) ; @Update("UPDATE tcc_transaction_log SET state = #{targetState} WHERE xid = #{xid} AND branch_id = #{branchId} AND state = #{oldState}") int updateStateWithCas (@Param("xid") String xid, @Param("branchId") Long branchId, @Param("targetState") Integer targetState, @Param("oldState") Integer oldState) ; }
TccTransactionLog
1 2 3 4 5 6 7 8 9 10 11 12 13 14 package com.zdemo.scloud.user.entity;import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;@Data @NoArgsConstructor @AllArgsConstructor public class TccTransactionLog { private String xid; private Long branchId; private Integer state; }
暴露 Feign API 实现 下游微服务提供方在各自的对外 Controller 中暴露该 TCC 的 Try 接口即可:
StorageController
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Slf4j @RestController @RequestMapping("/storage") public class StorageController { @Resource private StorageTccService storageTccService; @PostMapping("/decrease") public Result<Void> decrease (@RequestParam("commodityCode") String commodityCode, @RequestParam("count") Integer count) { log.info("▶▶▶ [仓储服务] 收到扣减库存请求. XID: {}" , RootContext.getXID()); storageTccService.prepareDecrease(null , commodityCode, count); return Result.success(null ); } }
AccountController
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Slf4j @RestController @RequestMapping("/account") public class AccountController { @Resource private AccountTccService accountTccService; @PostMapping("/decrease") public Result<Void> decrease (@RequestParam("userId") Long userId, @RequestParam("money") BigDecimal money) { log.info("▶▶▶ [账户服务] 收到扣减余额请求. XID: {}" , RootContext.getXID()); accountTccService.prepareDecrease(null , userId, money); return Result.success(null ); } }
Order 订单发起方(TM)保持不变。由于 TCC 良好的抽象性,作为全局事务发起者的 OrderService 除了打印日志之外,不需要对全链路做出任何核心代码的修改。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 @Slf4j @Service public class OrderService { @Resource private OrderMapper orderMapper; @Resource private StoreFeignClient storeFeignClient; @Resource private UserFeignClient userFeignClient; @GlobalTransactional(name = "zdemo-create-order-tx", rollbackFor = Exception.class) public void createOrder (OrderDTO orderDto) { log.info("==========================================================================" ); log.info("🏁 触发分布式下单大闸 [TCC 工业级标准实现模式]!XID = {}" , RootContext.getXID()); Order order = new Order (); orderMapper.insert(order); log.info("Step 1 ➔ 本地订单创建成功(一阶段落盘)" ); storeFeignClient.decrease(orderDto.getCommodityCode(), orderDto.getCount()); log.info("Step 2 ➔ 远程微服务[仓储中心] TCC-Try 锁定库存成功!" ); userFeignClient.decrease(orderDto.getUserId(), orderDto.getMoney()); log.info("Step 3 ➔ 远程微服务[账户中心] TCC-Try 锁定资金成功!" ); order.setStatus(1 ); orderMapper.updateById(order); log.info("🎉 Step 4 ➔ 全链路业务预留通过,全局事务准备二阶段自动异步广播 Confirm!" ); log.info("==========================================================================" ); } }
其他说明 TCC 模式需要改配置文件的 data-source-proxy-mode 吗? 其实是不需要的,TCC 模式完全不需要修改 data-source-proxy-mode,甚至根本不依赖 Seata 的数据源代理功能!
data-source-proxy-mode(可选值有 AT、XA)是专门给 AT 模式 和 XA 模式 准备的。哪怕你的 data-source-proxy-mode 配置成了 AT 或者是默认值,它也完全不会干扰 TCC 的运行。当一个接口被标记为 TCC 接口时,Seata 会自动走 TCC 的切面逻辑,直接绕过数据源代理拦截。
AT/XA 模式 属于 “框架接管事务”,Seata 必须把你的普通数据库连接池(如 Hikari、Druid)人肉包装成 DataSourceProxy,进而拦截你的 SQL 去自动生成 undo_log 或发送 XA PREPARE。
TCC 模式 则是完全由业务代码人肉接管事务。一阶段 Try、二阶段 Confirm / Cancel 里面写的都是普通的本地 SQL(利用 Spring 的 @Transactional 提交或回滚)。Seata 在这里只充当一个分布式协调拦截器(通过 AOP 切面记录分支状态),它根本不需要去代理、拦截你的底层数据库连接。
要在项目里顺畅运行 TCC 模式,配置文件(如 application.yml)中只需要确保:
这样Seata 就能扫描到你的 @LocalTCC 注解和 @TwoPhaseBusinessAction 注解,从而利用 AOP 建立分布式调用链路。
AT 与 TCC 的混合部署 混合部署的必要性 在电商分布式大促(如双十一、618)的极限并发场景下,热点商品库存(如 1 元秒杀的 iPhone、爆款国潮大衣)往往是引爆系统雪崩的万恶之源。如果你整个链路全盘采用 Seata AT 模式,由于它依赖数据库的物理排他锁(全局锁),在热点商品全网疯抢时,成千上万的线程会卡在同一个数据库行记录上死等,数据库连接池瞬间被抽干,引发全链路雪崩。这时候的终极解法就是:打破单一模式的银弹幻想,采用 AT 与 TCC 的“混合部署(混合事务)” 对核心链路进行性能松绑。
混合部署的核心思想是:“非核心链路用 AT(买开发效率),核心热点链路用 TCC(买性能)”。 在大促下单的微服务链路中,我们通常这样拆分:
1 2 3 4 5 6 7 8 9 10 [用户下单] │ ├──> 订单服务 (Order Service) ────> 【AT 模式】(生成订单、优惠券,并发不高,业务复杂) │ └──> 库存服务 (Storage Service) ───> 【TCC 模式】(热点商品库存,极限并发,不容卡顿)
混合部署的代码落地 Seata 最强大的一点在于,TM(事务管理器)根本不在乎底层是 AT 还是 TCC,它只认 XID 。
在 Order 下单业务起点挂载全局事务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 @Service @Slf4j public class OrderServiceImpl implements OrderService { @Resource private OrderMapper orderMapper; @Resource private StorageFeignClient storageFeignClient; @Override @GlobalTransactional(name = "create-order-tx", rollbackFor = Exception.class) public void createOrder (OrderOrder order) { orderMapper.insert(order); storageFeignClient.prepareDecrease(null , order.getCommodityCode(), order.getCount()); } }
在库存微服务,全面切换为 TCC 架构。我们在库存服务的接口层和实现类上,按照之前打磨好的安全范式进行重构,彻底卸载物理行锁。接口层(向 Seata 注册 TCC 行为):
1 2 3 4 5 6 7 8 9 public interface StorageTccService { @TwoPhaseBusinessAction(name = "storageTccAction", commitMethod = "confirm", rollbackMethod = "cancel") boolean prepareDecrease ( BusinessActionContext actionContext, @BusinessActionContextParameter(paramName = "commodityCode") String commodityCode, @BusinessActionContextParameter(paramName = "count") Integer count ) ;}
实现类(性能松绑的关键),在一阶段 prepareDecrease 中,利用业务账目隔离代替物理死等:
1 2 int affectedRows = storageTccMapper.freezeStorage(commodityCode, count);
执行完这句 SQL 并在 prepareDecrease 返回后,当前线程在数据库里的物理事务已经 Commit 并释放锁了! 如果此时有第二个线程来抢购同一件商品,它不需要等待第一个事务的二阶段决议,直接就可以进去执行第二句 freezeStorage。多个事务并发修改同一行的不同部分(扣减可用、增加冻结),数据库的行锁停留时间从“秒级”缩短到了“毫秒级”!
TCC + 异步缓冲 如果混合部署后,数据库行更新(Row Update)的 TPS 依然达到了物理硬件上限(如单机 MySQL 每秒几千次硬编码更新上限),我们可以基于 TCC 玩出更激进的“热点扣减缓冲”:
一阶段 Try 走 Redis :在 prepareDecrease 里,不直接扣减 MySQL,而是利用 Lua 脚本扣减 Redis 里的热点库存,并在 Redis 里记录冻结痕迹。
异步批量落盘 :本地开启一个定时任务,或者通过消息队列(MQ),把 Redis 里的扣减结果批量合并(例如:把 100 次扣减 1 整合为 1 次扣减 100),然后批量更新 MySQL。
二阶段 Confirm/Cancel :
如果全局成功,Confirm 只负责清除 Redis 里的冻结标记,并标记 MySQL 异步对账完成。
如果全局失败,Cancel 负责把 Redis 里的可用库存吐回去,保证绝对不资损。
收益与代价 在大促复盘时,这种混合部署的架构能够带来质的飞跃:
吞吐量暴涨 :库存模块由于物理锁被“降级”为逻辑状态锁,热点行冲突引起的数据库雪崩彻底消失,热点链路吞吐量通常能获得数倍到十倍的提升。
资产安全无忧 :订单、优惠券等资产依然保留在 AT 模式的强一致性守护下,不需要程序员去人肉为每个微服务手写复杂的 Cancel 补偿逻辑,研发效率得到了兼顾。
代价就是:
针对 TCC,程序员需要处理极其烧脑的高并发边缘场景。 诸如空回滚、幂等、防悬挂、CAS 状态占坑、二阶段参数丢失投毒。稍有不慎,就会在生产环境引爆严重的资金损失或账目对不上。
丧失了全局“读隔离”,引来脏读与超卖体验问题。 AT 模式依赖 Seata 的全局锁。如果事务 A 正在修改库存,事务 B 在 A 没最终提交前,是查不到、也改不了这笔库存的,具有极高的隔离性。TCC 追求最终一致性。在一阶段 Try 完结后,可用库存已经减少,冻结库存已经增加。如果此时全局事务在二阶段翻车了,触发 Cancel 倒车释放。用户在前端会看到“库存已抢光”,但过了 2 秒刷新一看,由于别的事务回滚了,“库存居然又离奇地吐出来了”。这种数据在中间态的“软状态”抖动,会带来一定程度上的用户体验魔幻感。
运维与分布式链路追踪的复杂度翻倍。 在纯 AT 或纯 TCC 架构中,事务的排查路径是单一的。一旦混合部署,当一个分布式事务挂掉时,你必须在日志中同时去抠 AT 的全局锁、Undo Log 镜像回滚轨迹,以及 TCC 的控制流水表状态机演变线。一旦由于突发断电导致 Seata 协调器(TC)大盘死账积压,你既要清理 undo_log 表,又要手动去人肉核对 TCC 流水表里的 state=1 悬挂数据,运维负担极大。
Saga 模式的说明 Saga 模式的简介 Saga 模式是分布式事务领域里的“长跑健壮型选手”。如果说 TCC 是“强迫症式的精细化资产锁定”,那么 Saga 就是“敢作敢当、错了再改的补偿机制”。它不要求你在第一阶段去“冻结”或者“预留”任何资产,而是直接真刀真枪地把业务走出一步,如果后面有人翻车了,它再倒回去执行反向操作。它和 TCC 的核心区别是:TCC 必须要经历 Try 这一步中间态(冻结资产)。而 Saga 没有中间态,一上来 $T_1$ 就直接把库存扣了,把钱划走了。
Saga 的思想非常朴素:它把一个长分布式事务拆分成一系列的 本地局部事务 $T_1, T_2, \dots, T_n$。 每一个正向事务 $T_i$ 都有一个与之对应的逆向补偿事务 $C_i$。它的执行无非就两种结局:
一种是顺风顺水(一路成功):所有正向动作顺利走完,分布式事务安全结案。
$$T_1 \rightarrow T_2 \rightarrow T_3 \dots \rightarrow T_n$$
另一种是突发翻车(中途失败与回滚):假设走到 $T_3$ 时失败了(比如用户卡里的钱不够了或者库存突然空了),Saga 协调器(在 Seata 里叫状态机)就会启动倒车补偿流,先原地停下,然后按照原路反向依次调用补偿动作$C_2, C_1$。
$$T_1 \rightarrow T_2 \rightarrow T_3(\text{失败}) \rightarrow C_2 \rightarrow C_1$$
Saga 模式的落地 在 Seata 中,实现 Saga 并不需要你像 TCC 那样写一堆注解,它是基于 状态机(State Machine) 来玩的。
Seata 提供了一个专门的引擎,你作为开发者,需要编写一个 JSON 配置文件(这个文件被称为状态机定义)。在 JSON 里,你像画流程图一样定义好(可以在 Seata 前端控制台进行设计):
第一步调用哪个微服务的哪个 Service 方法(正向 $T_1$)。
如果这一步失败了,应该调用哪个 Service 方法去冲正(逆向 $C_1$)。
下一步往哪儿走(路由条件)。
此时你的 Java 业务代码会变得极度纯粹,全都是标准的本地业务方法。具体说明:
正向业务方法(库存微服务):
1 2 3 4 5 public void reduceInventory (String commodityCode, Integer count) { int rows = storageMapper.reduceRealStorage(commodityCode, count); if (rows == 0 ) throw new RuntimeException ("库存不足!" ); }
对应的逆向补偿方法(库存微服务):
1 2 3 4 public void compensateInventory (String commodityCode, Integer count) { storageMapper.addRealStorage(commodityCode, count); }
向前还是向后 当分布式事务执行到一半报错时,Seata 的 Saga 状态机引擎支持两种补偿救场策略:
优缺点与生产选型 优势(为什么选它?)
超长链路的救星:TCC 和 AT 模式都需要长久占着数据库连接和锁(从 Try 一直卡到 Confirm),如果微服务链路太长、跨越好几个外部三方系统(比如调用了民航、银联接口),系统会直接卡死。Saga 每个步骤都是本地事务,随用随释放,并发吞吐量极高。
拯救老旧系统(Legacy System):很多遗留系统或外部第三方 API(比如发短信、充话费、出机票),压根不可能为了配合你改造去提供 Try、Confirm 接口。人家就一个 Order 接口(正向)和一个 Refund 退款接口(逆向)。这种情况下,只有 Saga 能带着它们一起玩。
致命痛点(生产必须面对的代价)
由于 Saga 是一上来就直接改了本地数据库(落盘结案),这就会带来可怕的 “脏写 (Dirty Write)” 和 “脏读”。比如,你的机票本来只剩 1 张了。事务 A 进来执行 $T_1$,直接把这张机票买走了(此时事务 A 还在跑后面的步骤,没彻底完结)。 就在这一瞬间,真实的用户去查机票,发现 “卖完了”。 紧接着,事务 A 在 $T_2$ 翻车了,触发 Saga 补偿 $C_1$,又把机票退了回来。 刚刚那个来看机票的用户就会觉得见鬼了——明明前一秒看没票了,后一秒看又有票了。这就是缺乏隔离性带来的体验魔幻。
什么时候掏出 Saga?
能用 AT 模式就用 AT(无侵入,最省心)
对资产、金钱敏感且并发极高的核心链路用 TCC(精细锁定,体验好)
链路长、调用了外部三方系统、或者老旧系统无法改造时,果断上 Saga。
标题:
Spring Cloud Alibaba 微服务系列 - Seata